IPC机制: 进程之间的通信,指两个进程之间进行数据交换

1. JoinableQueue 队列 和 Queue 队列的区别

  • 生产者生产完数据后,执行 p.join() 方法,意思是等待消费者把所有数据都处理完
  • 消费者进程每次从 JoinableQueue  队列中获取值并且处理完后都要执行 q.task_done() 方法,告诉生产者进程这个数据处理完了

2. 使用 JoinableQueue 队列的注意事项

  • 所有消费者进程必须设置为守护进程
  • 主进程中所有生产者进程必须执行 .join() 方法, 等待生产者进程执行完毕

3.使用 JoinableQueue 队列的执行顺序:生产者生产的数据全部被消费完 -> 生产者进程结束 -> 主进程代码执行结束 -> 消费者守护进程结束

import time
import random
from multiprocessing import Process
from multiprocessing import JoinableQueue


# 生产者
def producer(q, food):
    for i in range(5):
        q.put('%s-%s' % (i, food))
    q.join()  # 等待 消费者进程 把所有的数据处理完


# 消费者
def consumer(q, name):
    while True:
        time.sleep(random.random())
        food = q.get()
        print('%s,吃了%s' % (name, food))
        q.task_done()  # 告诉生产者进程我处理完了这一个数据


if __name__ == '__main__':
    q = JoinableQueue()
# 生产者进程
    p1 = Process(target=producer, args=(q, '苹果'))
    p1.start()
    p2 = Process(target=producer, args=(q, '西瓜'))
    p2.start()

# 消费者进程 -> 所有的消费者进程都要开启守护进程,当主进程的代码执行结束后,消费者进程也随之结束,里面的循环也不会被执行
    c1 = Process(target=consumer, args=(q, 'Kevin'))
    c1.daemon = True
    c1.start()
    c2 = Process(target=consumer, args=(q, 'Yeung'))
    c2.daemon = True
    c2.start()
    c3 = Process(target=consumer, args=(q, 'Aimer'))
    c3.daemon = True
    c3.start()

# 主进程中所有的生产者进程都要执行 .join 方法,等待 xxx 生产者进程结束
    p1.join()  # 等待 p1 生产者进程结束
    p2.join()  # 等待 p2 生产者进程结束